---
id: mqttclient
title: Mqtt客户端使用
---

import Tag from "@site/src/components/Tag.js";

### 定义

命名空间：TouchSocket.Mqtt <br/>
程序集：[TouchSocket.Mqtt.dll](https://www.nuget.org/packages/TouchSocket.Mqtt)

## 一、说明

`MqttClient`是遵循Mqtt协议的消息客户端，支持连接Mqtt Broker、订阅主题、发布消息等功能，支持QoS 0/1/2三种消息质量等级。

## 二、特点

- 轻量级协议支持
- QoS消息质量保障
- 遗嘱消息支持
- 保留消息处理
- TLS加密通信
- 主题通配符匹配（+/#）

## 三、应用场景

- IoT设备数据上报
- 跨平台消息推送
- 低带宽环境通信
- 设备状态同步

## 四、可配置项

继承所有[TcpClient](./tcpclient.mdx)的配置。除此之外还支持Mqtt的一些专有配置。

<details>
<summary>可配置项</summary>
<div>

#### SetMqttConnectOptions

`MqttConnectOptions` 类用于配置 Mqtt 客户端连接到 Mqtt 服务端时的参数，支持 Mqtt 3.1.1 及 5.0+ 协议版本。以下是各配置项的详细说明：

**基础连接选项（通用）**

| 属性名                  | Mqtt 版本支持       | 说明                                                                                     |
|-------------------------|---------------------|------------------------------------------------------------------------------------------|
| `CleanSession`          | 3.1.1+              | 是否清理会话（断开后删除未完成的遗嘱和离线消息）。                                       |
| `ClientId`              | 3.1.1+              | 客户端唯一标识符（服务端用于识别客户端）；Mqtt 3.1.1 中必填（长度 1-23 字节，ASCII 字符）；5.0+ 允许空字符串（仅当服务端允许时）。 |
| `KeepAlive`             | 3.1.1+              | 心跳保活时长（单位：秒），表示客户端与服务端保持连接的最长时间；需与服务端配置兼容（服务端可能拒绝过大或过小的值），默认值通常为 60 秒。 |
| `Password`              | 3.1.1+              | 连接认证密码（与 `UserName` 配合使用）；若 `UserName` 为空则无效；5.0+ 支持二进制数据（需通过 `AuthenticationData` 传递）。 |
| `ProtocolName`          | 3.1.1+              | Mqtt 协议名称（固定为 "MQTT"），默认值为 "MQTT"，通常无需手动设置。                       |
| `UserName`              | 3.1.1+              | 连接认证用户名（与 `Password` 配合使用）；5.0+ 支持二进制数据（需通过 `AuthenticationData` 传递）。                     |
| `Version`               | 3.1.1+/5.0+         | Mqtt 协议版本（如 `MqttProtocolVersion.V311` 或 `V500`）；需根据服务端支持的版本设置，否则可能导致连接失败。               |

---

**遗嘱（Will）消息配置（可选）**

当 `WillFlag` 为 `true` 时，服务端会在客户端异常断开时发布遗嘱消息。仅部分属性在 `WillFlag=true` 时生效。

| 属性名                      | Mqtt 版本支持       | 说明                                                                                     |
|-----------------------------|---------------------|------------------------------------------------------------------------------------------|
| `WillFlag`                  | 3.1.1+              | 是否启用遗嘱消息（若为 `true`，需配置遗嘱相关属性）。                                      |
| `WillQos`                   | 3.1.1+              | 遗嘱消息的服务质量等级（0/1/2）；需与服务端支持的 QoS 等级兼容。                           |
| `WillRetain`                | 3.1.1+              | 是否保留遗嘱消息（服务端保留最新遗嘱消息供新订阅者接收）；默认值为 `false`；若为 `true`，服务端需支持保留消息功能。       |
| `WillMessage`               | 3.1.1+              | 遗嘱消息的内容（文本类型，Mqtt 5.0+ 支持二进制数据，需通过 `WillContentType` 和 `WillPayloadFormatIndicator` 配合）。       |
| `WillTopic`                 | 3.1.1+              | 遗嘱消息发布的主题（需符合 Mqtt 主题命名规则）；主题需提前订阅才能接收遗嘱消息。           |
| `WillContentType`           | 5.0+                | 遗嘱消息内容的 MIME 类型（如 "text/plain"、"application/json"）；描述 `WillMessage` 的格式（仅 5.0+ 有效）。             |
| `WillPayloadFormatIndicator`| 5.0+                | 遗嘱消息负载格式指示符（如 `Raw`、`Utf8`）；更细粒度描述负载格式（仅 5.0+ 有效）。                                       |
| `WillResponseTopic`         | 5.0+                | 遗嘱消息触发后，服务端向客户端发送响应的主题；需客户端提前订阅该主题以接收响应。                                         |
| `WillCorrelationData`       | 5.0+                | 遗嘱消息关联的上下文数据（用于匹配请求与响应）；需与服务端约定格式（仅 5.0+ 有效）。                                     |
| `WillDelayInterval`         | 5.0+                | 遗嘱消息延迟发布的时间间隔（单位：秒）；遗嘱触发后，延迟指定时间再发布（仅 5.0+ 有效）。                                 |
| `WillMessageExpiryInterval` | 5.0+                | 遗嘱消息的有效期（单位：秒，过期后服务端删除）；超过此时间未发布的遗嘱消息将被丢弃（仅 5.0+ 有效）。                     |

---

**Mqtt 5.0+ 扩展配置（可选）**

仅当使用 Mqtt 5.0+ 协议时生效，用于更细粒度的连接控制。

| 属性名                      | Mqtt 版本支持       | 说明                                                                                     |
|-----------------------------|---------------------|------------------------------------------------------------------------------------------|
| `UserProperties`            | 5.0+                | 用户自定义属性（键值对，用于传递业务元数据）；键值对需符合 Mqtt 5.0 规范（键为 UTF-8 字符串，值支持多种类型）。           |
| `AuthenticationMethod`      | 5.0+                | 认证方法名称（如 "OAuth2"、"Custom"）；需与服务端支持的认证方法一致（仅 5.0+ 支持扩展认证）。                           |
| `AuthenticationData`        | 5.0+                | 认证方法的附加数据（如令牌、签名等）；配合 `AuthenticationMethod` 使用（仅 5.0+ 有效）。                               |
| `MaximumPacketSize`         | 5.0+                | 客户端请求的最大数据包大小（单位：字节）；服务端会返回其支持的最大值（客户端需遵守）。                                 |
| `ReceiveMaximum`            | 5.0+                | 客户端能接收的最大 QoS 1/2 消息数量（防止消息堆积）；默认值通常为 65535（需与服务端配置兼容）。                           |
| `RequestProblemInformation` | 5.0+                | 是否请求服务端在出错时返回详细问题信息（如原因码、原因字符串）；默认值为 `true`（建议开启以便排查问题）。                 |
| `RequestResponseInformation`| 5.0+                | 是否请求服务端返回连接响应的额外信息（如会话过期时间）；默认值为 `false`（仅当需要时启用）。                             |
| `SessionExpiryInterval`     | 5.0+                | 会话过期时间间隔（单位：秒，0 表示立即过期）；客户端断开后，服务端保留会话的时间（仅当 `CleanSession=false` 时有效）。       |
| `TopicAliasMaximum`         | 5.0+                | 客户端允许的最大主题别名值（服务站不能使用超过此值的别名）；默认值为 0（表示不使用主题别名）。                           |

---

**注意事项**

1. **协议版本兼容性**：部分属性（如 `UserProperties`、`AuthenticationMethod`）仅在 Mqtt 5.0+ 中有效，需根据服务端版本选择配置。
2. **遗嘱消息生效条件**：`WillFlag` 必须为 `true`，且 `WillTopic` 和 `WillMessage` 需有效配置，否则遗嘱不会被发送。
3. **认证扩展**：5.0+ 的 `AuthenticationData` 需配合自定义认证插件使用，具体格式由服务端定义。
4. **只读属性**：`WillMessage` 和 `WillTopic` 标记为 `internal set`，通常通过构造函数或专用方法设置（如 `MqttApplicationMessage`）。

</div>
</details>

## 五、支持插件

|  插件方法| 功能 |
| --- | --- |
| IMqttConnectingPlugin | 当Mqtt客户端正在连接之前调用此方法。 |
| IMqttConnectedPlugin | 当Mqtt客户端连接成功时调用。 |
| IMqttClosingPlugin | 当Mqtt客户端正在关闭时调用。|
| IMqttClosedPlugin | 当Mqtt客户端断开连接后触发。 |
| IMqttReceivingPlugin | 在收到Mqtt所有消息时触发，可以通过`e.MqttMessage`获取到`Mqtt`的所有消息，包括订阅、订阅确认、发布、发布确认等。 |
| IMqttReceivedPlugin | 当接收到Mqtt发布消息，且成功接收时触发。可以通过`e.MqttMessage`获取到`Mqtt`的发布消息。 |


## 六、创建Mqtt客户端

```csharp showLineNumbers
var client = new MqttTcpClient();
await client.SetupAsync(new TouchSocketConfig()
    .SetRemoteIPHost("tcp://127.0.0.1:7789")
    .SetMqttConnectOptions(options =>
    {
        options.ClientId = "TestClient";
        options.UserName = "TestUser";
        options.Password = "TestPassword";
        options.ProtocolName = "MQTT";
        options.Version = MqttProtocolVersion.V311;
        options.KeepAlive = 60;
        options.CleanSession = true;

        // v5可以继续配置其他

        // options.UserProperties = new[]
        // {
        //     new MqttUserProperty("key1","value1"),
        //     new MqttUserProperty("key2","value2")
        // };
    })
    .ConfigurePlugins(a =>
    {
        a.AddMqttConnectingPlugin(async (mqttSession, e) =>
        {
            Console.WriteLine($"Client Connecting:{e.ConnectMessage.ClientId}");
            await e.InvokeNext();
        });

        a.AddMqttConnectedPlugin(async (mqttSession, e) =>
        {
            Console.WriteLine($"Client Connected:{e.ConnectMessage.ClientId}");
            await e.InvokeNext();
        });

        a.AddMqttReceivingPlugin(async (mqttSession, e) =>
        {
            await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
        });

        a.AddMqttReceivedPlugin(async (mqttSession, e) =>
        {
            var message = e.Message;
            await e.InvokeNext().ConfigureAwait(EasyTask.ContinueOnCapturedContext);
        });

        a.AddMqttClosingPlugin(async (mqttSession, e) =>
        {
            Console.WriteLine($"Client Closing:{e.MqttMessage.MessageType}");
            await e.InvokeNext();
        });

        a.AddMqttClosedPlugin(async (mqttSession, e) =>
        {
            Console.WriteLine($"Client Closed:{e.Message}");
            await e.InvokeNext();
        });

    }));//载入配置

await client.ConnectAsync();//连接
```


## 七、订阅与取消订阅

### 7.1 订阅

按照Mqtt协议，客户端可以通过`Subscribe`指令订阅一个或多个主题，服务端会返回订阅结果。

```csharp {10} showLineNumbers
var topic1 = "topic1";
var topic2 = "topic2";
SubscribeRequest subscribeRequest1=new SubscribeRequest(topic1, QosLevel.AtLeastOnce);//订阅请求
SubscribeRequest subscribeRequest2=new SubscribeRequest(topic2, QosLevel.AtMostOnce);//可以设置不同的Qos级别

//多个订阅请求
var mqttSubscribeMessage = new MqttSubscribeMessage(subscribeRequest1, subscribeRequest2);

//执行订阅
var mqttSubAckMessage = await client.SubscribeAsync(mqttSubscribeMessage);

//输出订阅结果
foreach (var item in mqttSubAckMessage.ReturnCodes)
{
    Console.WriteLine($"ReturnCode:{item}");
}
```

### 7.2 取消订阅

按照Mqtt协议，客户端可以通过`Unsubscribe`指令取消订阅一个或多个主题，服务端会返回所有取消订阅结果。

```csharp {5} showLineNumbers
var topic1 = "topic1";
var topic2 = "topic2";

//取消订阅
var mqttUnsubAckMessage = await client.UnsubscribeAsync(new MqttUnsubscribeMessage(topic1,topic2));
```


## 八、接收消息

Mqtt组件的消息都是通过插件抛出的。你可以订阅`IMqttReceivingPlugin`插件和`IMqttReceivedPlugin`插件来接收消息。

正如插件说明所示，`IMqttReceivingPlugin`插件可以在收到所有消息时触发，`IMqttReceivedPlugin`插件可以在收到发布消息时触发。

所以如果你只关心发布成功的消息，那么可以只订阅`IMqttReceivedPlugin`插件即可。

你可以直接使用委托实现订阅：

```csharp {3-17} showLineNumbers
.ConfigurePlugins(a =>
{
    a.AddMqttReceivedPlugin(async (client, e) =>
    {
        var mqttMessage = e.MqttMessage;
        Console.WriteLine("Reved:" + mqttMessage);

        //订阅消息的主题
        var topicName = mqttMessage.TopicName;

        //订阅消息的Qos级别
        var qosLevel = mqttMessage.QosLevel;

        //订阅消息的Payload
        var payload = mqttMessage.Payload;
        await e.InvokeNext();
    });
})
```

或者使用插件类来继承接口实现：

```csharp showLineNumbers
class MyMqttReceivedPlugin : PluginBase, IMqttReceivedPlugin
{
    public async Task OnMqttReceived(IMqttSession client, MqttReceivedEventArgs e)
    {
        var mqttMessage = e.MqttMessage;

        //订阅消息的主题
        var topicName = mqttMessage.TopicName;

        //订阅消息的Qos级别
        var qosLevel = mqttMessage.QosLevel;

        //订阅消息的Payload
        var payload = mqttMessage.Payload;
        await e.InvokeNext();
    }
}
```

然后把这个类添加到插件即可：

```csharp {3} showLineNumbers
.ConfigurePlugins(a =>
{
    a.Add<MyMqttReceivedPlugin>();//添加自定义插件
})
```

## 九、发布消息

```csharp {3} showLineNumbers
MqttPublishMessage message = new(topic1, false, QosLevel.AtLeastOnce, Encoding.UTF8.GetBytes("Hello World"));

await client.PublishAsync(message);
```


